log4j.logger.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend=DEBUG
CoarseGrainedSchedulerBackend
CoarseGrainedSchedulerBackend is a SchedulerBackend and ExecutorAllocationClient.
It is responsible for requesting resources from a cluster manager for executors to be able to launch tasks (on coarse-grained executors).
This backend holds executors for the duration of the Spark job rather than relinquishing executors whenever a task is done and asking the scheduler to launch a new executor for each new task.
When being created, CoarseGrainedSchedulerBackend requires a Task Scheduler, and a RPC Environment.
It uses LiveListenerBus.
It registers CoarseGrainedScheduler RPC Endpoint that executors use for RPC communication.
It tracks:
-
the total number of cores in the cluster (using
totalCoreCount) -
the total number of executors that are currently registered
-
executors (
ExecutorData) -
executors to be removed (
executorsPendingToRemove) -
hosts and the number of possible tasks possibly running on them
-
lost executors with no real exit reason
-
tasks per slaves (
taskIdsOnSlave)
Known Implementations:
-
Spark Standalone’s StandaloneSchedulerBackend
-
Spark on YARN’s YarnSchedulerBackend
-
Spark on Mesos’s MesosCoarseGrainedSchedulerBackend
|
Tip
|
Enable Add the following line to Refer to Logging. |
Creating CoarseGrainedSchedulerBackend Instance
CoarseGrainedSchedulerBackend requires a task scheduler and a RPC Environment when being created.
It initializes the following registries:
-
totalCoreCount to
0 -
totalRegisteredExecutors to
0 -
maxRpcMessageSize to spark.rpc.message.maxSize.
-
_minRegisteredRatio to spark.scheduler.minRegisteredResourcesRatio (between
0and1inclusive). -
maxRegisteredWaitingTimeMs to spark.scheduler.maxRegisteredResourcesWaitingTime.
-
createTime to the current time.
-
executorDataMap to an empty collection.
-
numPendingExecutors to
0 -
executorsPendingToRemove to an empty collection.
-
hostToLocalTaskCount to an empty collection.
-
localityAwareTasks to
0
It accesses the current LiveListenerBus and SparkConf through the constructor’s reference to TaskSchedulerImpl.
Getting Executor Ids — getExecutorIds Method
When called, getExecutorIds simply returns executor ids from the internal executorDataMap registry.
|
Note
|
It is called when SparkContext calculates executor ids. |
CoarseGrainedSchedulerBackend Contract
|
Caution
|
FIXME |
doRequestTotalExecutors Method
doRequestTotalExecutors(requestedTotal: Int): Boolean = false
doRequestTotalExecutors requests requestedTotal executors from a cluster manager. It is a protected method that returns false by default (that coarse-grained scheduler backends are supposed to further customize).
|
Note
|
It is called when In fact, all the aforementioned methods are due to the ExecutorAllocationClient contract that |
|
Note
|
It is customized by the coarse-grained scheduler backends for YARN, Spark Standalone, and Mesos. |
Internal Registries
currentExecutorIdCounter Counter
currentExecutorIdCounter is the last (highest) identifier of all allocated executors.
|
Note
|
It is exclusively used in YarnSchedulerEndpoint to respond to RetrieveLastAllocatedExecutorId message.
|
executorDataMap Registry
executorDataMap = new HashMap[String, ExecutorData]
executorDataMap tracks executor data by executor id.
It uses ExecutorData that holds an executor’s endpoint reference, address, host, the number of free and total CPU cores, the URL of execution logs.
|
Note
|
A new executor (id, data) pair is added when DriverEndpoint receives RegisterExecutor message and removed when DriverEndpoint receives RemoveExecutor message or a remote host (with one or many executors) disconnects.
|
numPendingExecutors
|
Caution
|
FIXME |
numExistingExecutors
|
Caution
|
FIXME |
executorsPendingToRemove
|
Caution
|
FIXME |
localityAwareTasks
|
Caution
|
FIXME |
hostToLocalTaskCount
|
Caution
|
FIXME |
Requesting Additional Executors — requestExecutors Method
requestExecutors(numAdditionalExecutors: Int): Boolean
requestExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).
|
Note
|
requestExecutors method is a part of ExecutorAllocationClient Contract that SparkContext uses for requesting additional executors (as a part of a developer API for dynamic allocation of executors).
|
When called, you should see the following INFO message followed by DEBUG message in the logs:
INFO Requesting [numAdditionalExecutors] additional executor(s) from the cluster manager
DEBUG Number of pending executors is now [numPendingExecutors]
The internal numPendingExecutors is increased by the input numAdditionalExecutors.
requestExecutors requests executors from a cluster manager (that reflects the current computation needs). The "new executor total" is a sum of the internal numExistingExecutors and numPendingExecutors decreased by the number of executors pending to be removed.
If numAdditionalExecutors is negative, a IllegalArgumentException is thrown:
Attempted to request a negative number of additional executor(s) [numAdditionalExecutors] from the cluster manager. Please specify a positive number!
|
Note
|
It is a final method that no other scheduler backends could customize further. |
|
Note
|
The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one. |
Requesting Exact Number of Executors — requestTotalExecutors Method
requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]): Boolean
requestTotalExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).
|
Note
|
requestTotalExecutors is a part of ExecutorAllocationClient Contract that SparkContext uses for requesting the exact number of executors.
|
It sets the internal localityAwareTasks and hostToLocalTaskCount registries. It then calculates the exact number of executors which is the input numExecutors and the executors pending removal decreased by the number of already-assigned executors.
If numExecutors is negative, a IllegalArgumentException is thrown:
Attempted to request a negative number of executor(s) [numExecutors] from the cluster manager. Please specify a positive number!
|
Note
|
It is a final method that no other scheduler backends could customize further. |
|
Note
|
The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one. |
minRegisteredRatio Property
minRegisteredRatio: Double
minRegisteredRatio returns a ratio between 0 and 1 (inclusive). You can use spark.scheduler.minRegisteredResourcesRatio to control the value.
Starting CoarseGrainedSchedulerBackend — start Method
start initializes CoarseGrainedScheduler RPC Endpoint.
|
Note
|
start is part of the SchedulerBackend Contract.
|
|
Note
|
The RPC Environment is passed on as an constructor parameter. |
Stopping CoarseGrainedSchedulerBackend — stop Method
stop method stops executors and CoarseGrainedScheduler RPC endpoint.
|
Note
|
stop is part of the SchedulerBackend Contract.
|
|
Note
|
When called with no driverEndpoint both stop() and stopExecutors() do nothing. driverEndpoint is initialized in start and the initialization order matters.
|
It prints INFO to the logs:
INFO Shutting down all executors
It then sends StopExecutors message to driverEndpoint. It disregards the response.
It sends StopDriver message to driverEndpoint. It disregards the response.
Compute Default Level of Parallelism — defaultParallelism Method
The default parallelism is controlled by spark.default.parallelism or is at least 2 or totalCoreCount.
|
Note
|
defaultParallelism is part of the SchedulerBackend Contract.
|
Reviving Offers — reviveOffers Method
|
Note
|
reviveOffers is part of the SchedulerBackend Contract.
|
reviveOffers simply sends a ReviveOffers message to driverEndpoint (so it is processed asynchronously, i.e. on a separate thread, later on).
Killing Task — killTask Method
killTask simply sends a KillTask message to driverEndpoint.
|
Caution
|
FIXME Image |
|
Note
|
killTask is part of the SchedulerBackend Contract.
|
Delaying Task Launching — isReady Method
isReady is a custom implementation of isReady from the SchedulerBackend Contract that allows to delay task launching until sufficient resources are registered or spark.scheduler.maxRegisteredResourcesWaitingTime passes.
|
Note
|
isReady is used exclusively by TaskSchedulerImpl.waitBackendReady.
|
It starts checking whether there are sufficient resources available (using sufficientResourcesRegistered method).
|
Note
|
By default sufficientResourcesRegistered always responds that sufficient resources are available.
|
If sufficient resources are available, you should see the following INFO message in the logs:
INFO SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: [minRegisteredRatio]
The method finishes returning true.
|
Note
|
minRegisteredRatio in the logs above is in the range 0 to 1 (uses spark.scheduler.minRegisteredResourcesRatio) to denote the minimum ratio of registered resources to total expected resources before submitting tasks.
|
In case there are no sufficient resources available yet (the above requirement does not hold), it checks whether the time from the startup (as createTime) passed spark.scheduler.maxRegisteredResourcesWaitingTime to give a way to submit tasks (despite minRegisteredRatio not being reached yet).
You should see the following INFO message in the logs:
INFO SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: [maxRegisteredWaitingTimeMs](ms)
The method finishes returning true.
Otherwise, when no sufficient resources are available and maxRegisteredWaitingTimeMs has not been passed, it finishes returning false.
sufficientResourcesRegistered Method
sufficientResourcesRegistered always responds that sufficient resources are available.
Stop All Executors — stopExecutors Method
stopExecutors sends a blocking StopExecutors message to driverEndpoint (if already initialized).
|
Note
|
It is called exclusively while CoarseGrainedSchedulerBackend is being stopped.
|
You should see the following INFO message in the logs:
INFO CoarseGrainedSchedulerBackend: Shutting down all executors
Reset State — reset Method
reset resets the internal state:
-
Sets
numPendingExecutorsto 0 -
Clears
executorsPendingToRemove -
Sends a blocking RemoveExecutor message to driverEndpoint for every executor (in the internal
executorDataMap) to inform it aboutSlaveLostwith the message:Stale executor after cluster manager re-registered.
reset is a method that is defined in CoarseGrainedSchedulerBackend, but used and overriden exclusively by YarnSchedulerBackend.
Remove Executor — removeExecutor Method
removeExecutor(executorId: String, reason: ExecutorLossReason)
removeExecutor sends a blocking RemoveExecutor message to driverEndpoint.
|
Note
|
It is called by subclasses SparkDeploySchedulerBackend, CoarseMesosSchedulerBackend, and YarnSchedulerBackend. |
CoarseGrainedScheduler RPC Endpoint — driverEndpoint
When CoarseGrainedSchedulerBackend starts, it registers CoarseGrainedScheduler RPC endpoint to be the driver’s communication endpoint.
Internally, it is a DriverEndpoint object available as the driverEndpoint internal field.
|
Note
|
CoarseGrainedSchedulerBackend is created while SparkContext is being created that in turn lives inside a Spark driver. That explains the name driverEndpoint (at least partially).
|
It is called standalone scheduler’s driver endpoint internally.
It tracks:
-
Executor addresses (host and port) for executors (
addressToExecutorId) - it is set when an executor connects to register itself. See RegisterExecutor RPC message. -
Total number of core count (
totalCoreCount) - the sum of all cores on all executors. See RegisterExecutor RPC message. -
The number of executors available (
totalRegisteredExecutors). See RegisterExecutor RPC message. -
ExecutorDatafor each registered executor (executorDataMap). See RegisterExecutor RPC message.
It uses driver-revive-thread daemon single-thread thread pool for …FIXME
|
Caution
|
FIXME A potential issue with driverEndpoint.asInstanceOf[NettyRpcEndpointRef].toURI - doubles spark:// prefix.
|
-
spark.scheduler.revive.interval(default:1s) - time between reviving offers.
RPC Messages
KillTask(taskId, executorId, interruptThread)
RemoveExecutor
ReviveOffers
ReviveOffers simply passes the call on to makeOffers.
|
Caution
|
FIXME When is an executor alive? What other states can an executor be in? |
StatusUpdate
StatusUpdate(
executorId: String,
taskId: Long,
state: TaskState,
data: SerializableBuffer)
extends CoarseGrainedClusterMessage
|
Caution
|
FIXME |
StopExecutors
StopExecutors message is receive-reply and blocking. When received, the following INFO message appears in the logs:
INFO Asking each executor to shut down
It then sends a StopExecutor message to every registered executor (from executorDataMap).
RegisterExecutor
RegisterExecutor(
executorId: String,
executorRef: RpcEndpointRef,
hostname: String,
cores: Int,
logUrls: Map[String, String])
extends CoarseGrainedClusterMessage
|
Note
|
RegisterExecutor is sent when CoarseGrainedExecutorBackend (RPC Endpoint) starts.
|
Only one executor can register under executorId.
INFO Registered executor [executorRef] ([executorAddress]) with ID [executorId]
It does internal bookkeeping like updating addressToExecutorId, totalCoreCount, and totalRegisteredExecutors, executorDataMap.
When numPendingExecutors is more than 0, the following is printed out to the logs:
DEBUG Decremented number of pending executors ([numPendingExecutors] left)
It replies with RegisteredExecutor(executorAddress.host) (consult RPC Messages of CoarseGrainedExecutorBackend).
It then announces the new executor by posting SparkListenerExecutorAdded to LiveListenerBus.
Ultimately, makeOffers is called.
DriverEndpoint
DriverEndpoint is a ThreadSafeRpcEndpoint.
onDisconnected Callback
When called, onDisconnected removes the worker from the internal addressToExecutorId registry (that effectively removes the worker from a cluster).
While removing, it calls removeExecutor with the reason being SlaveLost and message:
Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
|
Note
|
onDisconnected is called when a remote host is lost.
|
Making Resource Offers — makeOffers Method
makeOffers(): Unit
makeOffers is a private method that takes the active executors (out of the executorDataMap internal registry) and creates WorkerOffer resource offers for each (one per executor with the executor’s id, host and free cores).
|
Caution
|
Only free cores are considered in making offers. Memory is not! Why?! |
It then requests TaskSchedulerImpl to process the resource offers to create a collection of TaskDescription collections that it in turn uses to launch tasks.
Launching Tasks — launchTasks Method
launchTasks(tasks: Seq[Seq[TaskDescription]])
launchTasks is a private helper method that iterates over TaskDescription objects in the tasks input collection and …FIXME
|
Note
|
launchTasks gets called when CoarseGrainedSchedulerBackend is making resource offers.
|
Internally, it serializes a TaskDescription (using the global closure Serializer) to a serialized task and checks the size of the serialized format of the task so it is less than maxRpcMessageSize.
|
Caution
|
FIXME Describe maxRpcMessageSize.
|
If the serialized task’s size is over the maximum RPC message size, the task’s TaskSetManager is aborted.
|
Caution
|
FIXME At that point, tasks have their executor assigned. When and how did that happen? |
If the serialized task’s size is correct, the task’s executor is looked up in the internal executorDataMap registry to record that the task is about to be launched and the number of free cores of the executor is decremented by the CPUS_PER_TASK constant (i.e. spark.task.cpus).
|
Note
|
ExecutorData keeps track of the number of free cores of the executor (as freeCores) as well as the RpcEndpointRef of the executor to send tasks to launch to (as executorEndpoint).
|
You should see the following INFO in the logs:
INFO DriverEndpoint: Launching task [taskId] on executor id: [executorId] hostname: [executorHost].
Ultimately, launchTasks sends a LaunchTask message to the executor’s RPC endpoint with the serialized task (wrapped in SerializableBuffer).
|
Note
|
Scheduling in Spark relies on cores only (not memory), i.e. the number of tasks Spark can run on an executor is constrained by the number of cores available only. When submitting Spark application for execution both — memory and cores — can be specified explicitly. |
Settings
| Spark Property | Default Value | Description |
|---|---|---|
|
Maximum message size (in MB) to allow in "control plane" communication; generally only applies to map output size (serialized) information sent between executors and the driver. Increase this if you are running jobs with many thousands of map and reduce tasks and see messages about the RPC message size. |
|
|
Double number between 0 and 1 (including) that controls the minimum ratio of (registered resources / total expected resources) before submitting tasks. See isReady in this document. |
|
|
Time to wait for sufficient resources available. See isReady in this document. |